package org.adam2.chamuel.source.collector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileInputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;

import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.conf.LogPrivacyUtil;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.ipfilter.IpFilterRule;
import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
import org.jboss.netty.handler.ipfilter.PatternRule;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StockRecordCollectorSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol {
    private static final String THREADS = "threads";
    private static final Logger logger = LoggerFactory.getLogger(StockRecordCollectorSource.class);
    private static final String PORT_KEY = "port";
    private static final String BIND_KEY = "bind";
    private static final String COMPRESSION_TYPE = "compression-type";
    private static final String SSL_KEY = "ssl";
    private static final String IP_FILTER_KEY = "ipFilter";
    private static final String IP_FILTER_RULES_KEY = "ipFilterRules";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
    private int port;
    private String bindAddress;
    private String compressionType;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private final List<String> excludeProtocols = new LinkedList();
    private boolean enableSsl = false;
    private boolean enableIpFilter;
    private String patternRuleConfigDefinition;
    private NioServerSocketChannelFactory socketChannelFactory;
    private Server server;
    private SourceCounter sourceCounter;
    private int maxThreads;
    private ScheduledExecutorService connectionCountUpdater;
    private List<IpFilterRule> rules;

    private static List<Event> batch = new ArrayList();

    public StockRecordCollectorSource() {
    }

    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, new String[]{"port", "bind"});
        this.port = context.getInteger("port").intValue();
        this.bindAddress = context.getString("bind");
        this.compressionType = context.getString("compression-type", "none");

        try {
            this.maxThreads = context.getInteger("threads", Integer.valueOf(0)).intValue();
        } catch (NumberFormatException var9) {
            logger.warn("StockRecordCollectorSource source's \"threads\" property must specify an integer value.", context.getString("threads"));
        }

        this.enableSsl = context.getBoolean("ssl", false).booleanValue();
        this.keystore = context.getString("keystore");
        this.keystorePassword = context.getString("keystore-password");
        this.keystoreType = context.getString("keystore-type", "JKS");
        String excludeProtocolsStr = context.getString("exclude-protocols");
        if (excludeProtocolsStr == null) {
            this.excludeProtocols.add("SSLv3");
        } else {
            this.excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
            if (!this.excludeProtocols.contains("SSLv3")) {
                this.excludeProtocols.add("SSLv3");
            }
        }

        if (this.enableSsl) {
            Preconditions.checkNotNull(this.keystore, "keystore must be specified when SSL is enabled");
            Preconditions.checkNotNull(this.keystorePassword, "keystore-password must be specified when SSL is enabled");

            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
            } catch (Exception var8) {
                throw new FlumeException("StockRecordCollectorSource source configured with invalid keystore: " + this.keystore, var8);
            }
        }

        this.enableIpFilter = context.getBoolean("ipFilter", false).booleanValue();
        if (this.enableIpFilter) {
            this.patternRuleConfigDefinition = context.getString("ipFilterRules");
            if (this.patternRuleConfigDefinition == null || this.patternRuleConfigDefinition.trim().isEmpty()) {
                throw new FlumeException("ipFilter is configured with true but ipFilterRules is not defined: ");
            }

            String[] patternRuleDefinitions = this.patternRuleConfigDefinition.split(",");
            this.rules = new ArrayList(patternRuleDefinitions.length);
            String[] var4 = patternRuleDefinitions;
            int var5 = patternRuleDefinitions.length;

            for(int var6 = 0; var6 < var5; ++var6) {
                String patternRuleDefinition = var4[var6];
                this.rules.add(this.generateRule(patternRuleDefinition));
            }
        }

        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }

    }

    public void start() {
        logger.info("Starting {}...", this);

        try {
            Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
            this.socketChannelFactory = this.initSocketChannelFactory();
            ChannelPipelineFactory pipelineFactory = this.initChannelPipelineFactory();
            this.server = new NettyServer(responder, new InetSocketAddress(this.bindAddress, this.port), this.socketChannelFactory, pipelineFactory, (ExecutionHandler)null);
        } catch (ChannelException var3) {
            logger.error("StockRecordCollectorSource source {} startup failed. Cannot initialize Netty server", this.getName(), var3);
            this.stop();
            throw new FlumeException("Failed to set up server socket", var3);
        }

        this.connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
        this.server.start();
        this.sourceCounter.start();
        super.start();
        final NettyServer srv = (NettyServer)this.server;
//        this.connectionCountUpdater.scheduleWithFixedDelay(() -> {
//            this.sourceCounter.setOpenConnectionCount((long)srv.getNumActiveConnections().longValue());
//        }, 0L, 60L, TimeUnit.SECONDS);
        connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){

            public void run() {
                sourceCounter.setOpenConnectionCount(
                        Long.valueOf(srv.getNumActiveConnections()));
            }
        }, 0, 60, TimeUnit.SECONDS);
        logger.info("StockRecordCollectorSource source {} started.", this.getName());
    }

    private NioServerSocketChannelFactory initSocketChannelFactory() {
        NioServerSocketChannelFactory socketChannelFactory;
        if (this.maxThreads <= 0) {
            socketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool((new ThreadFactoryBuilder()).setNameFormat("StockRecordCollectorSource " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), Executors.newCachedThreadPool((new ThreadFactoryBuilder()).setNameFormat("StockRecordCollectorSource " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build()));
        } else {
            socketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool((new ThreadFactoryBuilder()).setNameFormat("StockRecordCollectorSource " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), Executors.newFixedThreadPool(this.maxThreads, (new ThreadFactoryBuilder()).setNameFormat("StockRecordCollectorSource " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build()));
        }

        return socketChannelFactory;
    }

    private ChannelPipelineFactory initChannelPipelineFactory() {
        boolean enableCompression = this.compressionType.equalsIgnoreCase("deflate");
        Object pipelineFactory;
        if (!enableCompression && !this.enableSsl && !this.enableIpFilter) {
            pipelineFactory = Channels.pipelineFactory(Channels.pipeline());
        } else {
            pipelineFactory = new StockRecordCollectorSource.AdvancedChannelPipelineFactory(enableCompression, this.enableSsl, this.keystore, this.keystorePassword, this.keystoreType, this.enableIpFilter, this.patternRuleConfigDefinition);
        }

        return (ChannelPipelineFactory)pipelineFactory;
    }

    public void stop() {
        logger.info("StockRecordCollectorSource source {} stopping: {}", this.getName(), this);
        if (this.server != null) {
            this.server.close();

            try {
                this.server.join();
                this.server = null;
            } catch (InterruptedException var2) {
                logger.info("StockRecordCollectorSource source " + this.getName() + ": Interrupted while waiting for StockRecordCollectorSource server to stop. Exiting. Exception follows.", var2);
                Thread.currentThread().interrupt();
            }
        }

        if (this.socketChannelFactory != null) {
            this.socketChannelFactory.releaseExternalResources();
            this.socketChannelFactory = null;
        }

        this.sourceCounter.stop();
        if (this.connectionCountUpdater != null) {
            this.connectionCountUpdater.shutdownNow();
            this.connectionCountUpdater = null;
        }

        super.stop();
        logger.info("StockRecordCollectorSource source {} stopped. Metrics: {}", this.getName(), this.sourceCounter);
    }

    public String toString() {
        return "StockRecordCollectorSource source " + this.getName() + ": { bindAddress: " + this.bindAddress + ", port: " + this.port + " }";
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
        Map<String, String> stringMap = new HashMap();
        Iterator var2 = charSeqMap.entrySet().iterator();

        while(var2.hasNext()) {
            Entry<CharSequence, CharSequence> entry = (Entry)var2.next();
            stringMap.put(((CharSequence)entry.getKey()).toString(), ((CharSequence)entry.getValue()).toString());
        }

        return stringMap;
    }

    public Status append(AvroFlumeEvent avroEvent) {
        if (logger.isDebugEnabled()) {
            if (LogPrivacyUtil.allowLogRawData()) {
                logger.debug("StockRecordSourceStockRecordSource source {}: Received StockRecordCollectorSource event: {}", this.getName(), avroEvent);
            } else {
                logger.debug("StockRecordCollectorSource source {}: Received StockRecordCollectorSource event", this.getName());
            }
        }

        this.sourceCounter.incrementAppendReceivedCount();
        this.sourceCounter.incrementEventReceivedCount();
        Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders()));

        try {
            this.getChannelProcessor().processEvent(event);
        } catch (org.apache.flume.ChannelException var4) {
            logger.warn("Avro source " + this.getName() + ": Unable to process event. Exception follows.", var4);
            return Status.FAILED;
        }

        this.sourceCounter.incrementAppendAcceptedCount();
        this.sourceCounter.incrementEventAcceptedCount();
        return Status.OK;
    }

    public Status appendBatch(List<AvroFlumeEvent> events) {
    	//System.out.println("events size:	"+events.size());
        logger.debug("Avro source {}: Received avro event batch of {} events.", this.getName(), events.size());
        this.sourceCounter.incrementAppendBatchReceivedCount();
        this.sourceCounter.addToEventReceivedCount((long)events.size());

        Iterator var3 = events.iterator();
        List<String> eventList = new ArrayList<String>();
        Event newEvent = null;
        String stockRecordJson = null;
        Map header=null;
        while(var3.hasNext()) {
            AvroFlumeEvent avroEvent = (AvroFlumeEvent)var3.next();
            header=avroEvent.getHeaders();
            Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders()));
            try {
                stockRecordJson = new String(event.getBody(), "utf-8");
                eventList.add(stockRecordJson);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        
        if(null!=eventList && eventList.size()>=10){
            newEvent = EventBuilder.withBody(eventList.toString().getBytes(), toStringMap(header));
            batch.add(newEvent);
            try {
                this.getChannelProcessor().processEventBatch(batch);
            } catch (Throwable var6) {
                logger.error("Avro source " + this.getName() + ": Unable to process event batch. Exception follows.", var6);
                if (var6 instanceof Error) {
                    throw (Error)var6;
                }
                return Status.FAILED;
            }
            
            eventList.clear();
            batch.clear();
        }
        this.sourceCounter.incrementAppendBatchAcceptedCount();
        this.sourceCounter.addToEventAcceptedCount((long)events.size());
        return Status.OK;
    }

    private PatternRule generateRule(String patternRuleDefinition) throws FlumeException {
        patternRuleDefinition = patternRuleDefinition.trim();
        int firstColonIndex = patternRuleDefinition.indexOf(":");
        if (firstColonIndex == -1) {
            throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or 'name'>:<pattern>");
        } else {
            String ruleAccessFlag = patternRuleDefinition.substring(0, firstColonIndex);
            int secondColonIndex = patternRuleDefinition.indexOf(":", firstColonIndex + 1);
            if ((ruleAccessFlag.equals("allow") || ruleAccessFlag.equals("deny")) && secondColonIndex != -1) {
                String patternTypeFlag = patternRuleDefinition.substring(firstColonIndex + 1, secondColonIndex);
                if (!patternTypeFlag.equals("ip") && !patternTypeFlag.equals("name")) {
                    throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or 'name'>:<pattern>");
                } else {
                    boolean isAllow = ruleAccessFlag.equals("allow");
                    String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n") + ":" + patternRuleDefinition.substring(secondColonIndex + 1);
                    logger.info("Adding ipFilter PatternRule: " + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
                    return new PatternRule(isAllow, patternRuleString);
                }
            } else {
                throw new FlumeException("Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow'  or 'deny'>:<'ip' or 'name'>:<pattern>");
            }
        }
    }

    private class AdvancedChannelPipelineFactory implements ChannelPipelineFactory {
        private boolean enableCompression;
        private boolean enableSsl;
        private String keystore;
        private String keystorePassword;
        private String keystoreType;
        private boolean enableIpFilter;
        private String patternRuleConfigDefinition;

        public AdvancedChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType, boolean enableIpFilter, String patternRuleConfigDefinition) {
            this.enableCompression = enableCompression;
            this.enableSsl = enableSsl;
            this.keystore = keystore;
            this.keystorePassword = keystorePassword;
            this.keystoreType = keystoreType;
            this.enableIpFilter = enableIpFilter;
            this.patternRuleConfigDefinition = patternRuleConfigDefinition;
        }

        private SSLContext createServerSSLContext() {
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
                KeyManagerFactory kmf = KeyManagerFactory.getInstance(this.getAlgorithm());
                kmf.init(ks, this.keystorePassword.toCharArray());
                SSLContext serverContext = SSLContext.getInstance("TLS");
                serverContext.init(kmf.getKeyManagers(), (TrustManager[])null, (SecureRandom)null);
                return serverContext;
            } catch (Exception var4) {
                throw new Error("Failed to initialize the server-side SSLContext", var4);
            }
        }

        private String getAlgorithm() {
            String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
            if (algorithm == null) {
                algorithm = "SunX509";
            }

            return algorithm;
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.enableCompression) {
                ZlibEncoder encoder = new ZlibEncoder(6);
                pipeline.addFirst("deflater", encoder);
                pipeline.addFirst("inflater", new ZlibDecoder());
            }

            if (this.enableSsl) {
                SSLEngine sslEngine = this.createServerSSLContext().createSSLEngine();
                sslEngine.setUseClientMode(false);
                List<String> enabledProtocols = new ArrayList();
                String[] var4 = sslEngine.getEnabledProtocols();
                int var5 = var4.length;

                for(int var6 = 0; var6 < var5; ++var6) {
                    String protocol = var4[var6];
                    if (!StockRecordCollectorSource.this.excludeProtocols.contains(protocol)) {
                        enabledProtocols.add(protocol);
                    }
                }

                sslEngine.setEnabledProtocols((String[])enabledProtocols.toArray(new String[0]));
                StockRecordCollectorSource.logger.info("SSLEngine protocols enabled: " + Arrays.asList(sslEngine.getEnabledProtocols()));
                pipeline.addFirst("ssl", new SslHandler(sslEngine));
            }

            if (this.enableIpFilter) {
                StockRecordCollectorSource.logger.info("Setting up ipFilter with the following rule definition: " + this.patternRuleConfigDefinition);
                IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
                ipFilterHandler.addAll(StockRecordCollectorSource.this.rules);
                StockRecordCollectorSource.logger.info("Adding ipFilter with " + ipFilterHandler.size() + " rules");
                pipeline.addFirst("ipFilter", ipFilterHandler);
            }

            return pipeline;
        }
    }
}
